/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.msu.client.register;

import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.iec.edp.caf.common.JSONSerializer;
import io.iec.edp.caf.commons.runtime.CafEnvironment;
import io.iec.edp.caf.msu.api.ServiceUnitAwareService;
import io.iec.edp.caf.msu.api.client.ServiceRegistry;
import io.iec.edp.caf.msu.api.entity.MsuConstVariable;
import io.iec.edp.caf.msu.api.entity.MsuProperties;
import io.iec.edp.caf.msu.api.entity.ServiceUnitRegisterInfo;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PreDestroy;
import java.util.*;

/**
 * 服务中心-服务注册（k8s版）
 *
 * @author Leon Huo
 * @date 2023-04-26 14:40
 */
@Slf4j
public class KubernetesRegisterImpl implements ServiceRegistry {

    private MsuProperties configuration;

    private ServiceUnitAwareService suAware;

    private String namespace = "";

    private KubernetesClient client;

    private List<Service> services = new ArrayList<>();

    public KubernetesRegisterImpl(MsuProperties configuration, ServiceUnitAwareService suAware) {
        this.configuration = configuration;
        this.suAware = suAware;
    }

    /**
     * 应用注册
     */
    @Override
    public Boolean register(ServiceUnitRegisterInfo registerInfo) {
        // k8s namespace must toLowerCase
        // namespace as application name
        this.namespace = registerInfo.getAppName().toLowerCase();

        //service name as deployment unit,
        //one su only belongs to one deployment unit.one deployment unit can has multiple copies.
        String serviceName = this.configuration.getServiceName();

        String basePath = CafEnvironment.getBaseUrlPath();

        log.info("ServiceCenter(K8s) Start to register su of Namespace [{}] and ServiceName [{}]", this.namespace, serviceName);

        //forEach su to register service
        registerInfo.getServiceUnitInfo().stream().forEach(suInfo -> {
            String name = suInfo.getName().toLowerCase();

            Service service = getK8sClient().services().inNamespace(this.namespace).withName(name).get();
            if (service == null) {
                //DockerFile Expose port == server.port
                int port = CafEnvironment.getPort();

                //create service
                Service svc = new ServiceBuilder()
                        .withNewMetadata()
                        .withName(name)
                        .addToLabels(MsuConstVariable.K8S_MSU_LABEL, name)       //for ServiceDiscover
                        .addToLabels(MsuConstVariable.K8S_NAMESPACE_LABEL, this.namespace)
                        .addToLabels(MsuConstVariable.K8S_DEPLOYMENTUNIT_LABEL, serviceName)
                        .addToAnnotations(MsuConstVariable.MSU_DESCRIPTION, JSONSerializer.serialize(suInfo))
                        .addToAnnotations(MsuConstVariable.MSU_BASEPATH, basePath)
                        .endMetadata()
                        .withNewSpec()
                        .withSelector(Collections.singletonMap(MsuConstVariable.CAF_POD_LABEL, serviceName))      //bind pod
                        .addNewPort()
                        .withProtocol("TCP")
                        .withPort(port)
                        .withTargetPort(new IntOrString(port))
                        .endPort()
                        .withType("ClusterIP")
                        .endSpec()
                        .build();

                //register service
                svc = this.client.services().inNamespace(this.namespace).withName(name).create(svc);

                //record svc
                this.services.add(svc);

                log.info("ServiceCenter(K8s) Success to register k8s service of [{}]", name);
            } else {
                this.services.add(service);

                log.info("ServiceCenter(K8s) Exist k8s service of [{}]", name);
            }
        });

        log.info("ServiceCenter(K8s) Finish register su");
        return true;
    }

    @Override
    public Boolean register(ServiceUnitRegisterInfo registerInfo, String ip, Integer port) {
        // k8s namespace must toLowerCase
        // namespace as application name
        this.namespace = registerInfo.getAppName().toLowerCase();

        //service name as deployment unit,
        //one su only belongs to one deployment unit.one deployment unit can has multiple copies.
        String serviceName = this.configuration.getServiceName();

        String basePath = CafEnvironment.getBaseUrlPath();

        log.info("ServiceCenter(K8s) Start to register su of Namespace [{}] and ServiceName [{}]", this.namespace, serviceName);

        //forEach su to register service
        registerInfo.getServiceUnitInfo().stream().forEach(suInfo -> {
            String name = suInfo.getName().toLowerCase();

            Service service = getK8sClient().services().inNamespace(this.namespace).withName(name).get();
            if (service == null) {
                //create service
                Service svc = new ServiceBuilder()
                        .withNewMetadata()
                        .withName(name)
                        .addToLabels(MsuConstVariable.K8S_MSU_LABEL, name)       //for ServiceDiscover
                        .addToLabels(MsuConstVariable.K8S_NAMESPACE_LABEL, this.namespace)
                        .addToLabels(MsuConstVariable.K8S_DEPLOYMENTUNIT_LABEL, serviceName)
                        .addToAnnotations(MsuConstVariable.MSU_DESCRIPTION, JSONSerializer.serialize(suInfo))
                        .addToAnnotations(MsuConstVariable.MSU_BASEPATH, basePath)
                        .endMetadata()
                        .withNewSpec()
                        .withSelector(Collections.singletonMap(MsuConstVariable.CAF_POD_LABEL, serviceName))      //bind pod
                        .addNewPort()
                        .withProtocol("TCP")
                        .withPort(port)
                        .withTargetPort(new IntOrString(port))
                        .endPort()
                        .withType("ClusterIP")
                        .endSpec()
                        .build();

                //register service
                svc = this.client.services().inNamespace(this.namespace).withName(name).create(svc);

                //record svc
                this.services.add(svc);

                log.info("ServiceCenter(K8s) Success to register k8s service of [{}]", name);
            } else {
                this.services.add(service);

                log.info("ServiceCenter(K8s) Exist k8s service of [{}]", name);
            }
        });

        log.info("ServiceCenter(K8s) Finish register su");
        return true;
    }

    @Override
    @PreDestroy
    public Boolean unRegister() {
        log.info("ServiceCenter(K8s) Start to unregister su of namespace [{}]", this.namespace);

        //forEach services
        if (this.services != null && this.services.size() > 0) {
            for (Service svc : this.services) {
                String svcName = svc.getMetadata().getName();

                //query endpoints by service name (in k8s: service name == endpoints name)
                Endpoints endpoints = this.client.endpoints().inNamespace(this.namespace).withName(svcName).get();

                //delete service if Endpoints.subsets is null or empty
                if (endpoints.getSubsets() == null || endpoints.getSubsets().size() == 0) {
                    Boolean symbol = this.client.services().inNamespace(this.namespace).withName(svcName).delete();

                    log.info("ServiceCenter(K8s) Unregister [{}] of namespace [{}], result [{}]", symbol, this.namespace, symbol);
                }
            }
        }

        log.info("ServiceCenter(K8s) Finish to unregister su of namespace [{}]", this.namespace);
        return true;
    }

    /**
     * initialize k8s client
     */
    private KubernetesClient getK8sClient() {
        if (this.client == null) {
            //initialize k8s client
            log.info("init k8s client");
            this.client = new DefaultKubernetesClient();
            log.info("finish to init k8s client");
        }

        return this.client;
    }

}
